训练你的第一个 MetaSpore 模型
准备环境
AlphaIDE 提供了一个免费体验 Demo 环境,可以从 AlphaIDE 注册登录 这里免费注册使用。
准备数据
我们使用公开数据集 Terabyte Click Logs 作为示例训练数据。
我们对数据做了千分之一的采样以加快训练速度,具体数据细节可以查看:MetaSpore Demo Dataset.
在 Notebook 中运行以下代码,下载 Demo 数据集。这份数据大约占用 2.1GB 的空间,下载可能需要几分钟时间。如果下载失败,可以访问 MetaSpore Demo Dataset 并手工下载数据.
import metaspore
metaspore.demo.download_dataset()
检查数据是否已经下载成功:
!ls -l ${PWD}/data/
将这份数据上传到 S3:
- 将
{YOUR_S3_BUCKET}
和{YOUR_S3_PATH}
变量修改为执行环境中具体的桶名和需要写入数据的路径; - 去掉开头的注释
#
; - 执行这行代码。
#!aws s3 cp --recursive ${PWD}/data/ s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/data/
在 Jupyter 中,也可以点击菜单栏中 File
-> New
-> Terminal
打开一个终端,执行上面这行命令。
可以使用如下命令检查 S3 数据是否已经上传成功:
#!aws s3 ls s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/data/
schema
目录包含了运行所需的特征组合配置文件,也需要上传到 S3。
#!aws s3 cp --recursive ${PWD}/schema/ s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/schema/
以下我们假设数据目录在 s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/data/
, schema
目录在 s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/schema/
。
S3_ROOT_DIR = 's3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/'
定义模型结构
我们通过继承 PyTorch 的 torch.nn.module
来定义神经网络模型。下面的 DemoModule
提供了一个示例:
和一般的 PyTorch 模型相比,主要的区别是 _sparse
层,需要通过 ms.EmbeddingSumConcat
来定义。需要传入 embedding 向量长度和 schema 目录下的两个配置文件,即列名和特征组合规则。通过 ms.EmbeddingSumConcat
,MetaSpore 可以支持大规模的稀疏 Embedding 层参数的学习。
_schema_dir
变量即是上面我们上传到 S3 的 schema 目录。
import torch
import metaspore as ms
class DemoModule(torch.nn.Module):
def __init__(self):
super().__init__()
self._embedding_size = 16
self._schema_dir = S3_ROOT_DIR + 'demo/schema/'
self._column_name_path = self._schema_dir + 'column_name_demo.txt'
self._combine_schema_path = self._schema_dir + 'combine_schema_demo.txt'
self._sparse = ms.EmbeddingSumConcat(self._embedding_size, self._column_name_path, self._combine_schema_path)
self._sparse.updater = ms.FTRLTensorUpdater()
self._sparse.initializer = ms.NormalTensorInitializer(var=0.01)
self._dense = torch.nn.Sequential(
ms.nn.Normalization(self._sparse.feature_count * self._embedding_size),
torch.nn.Linear(self._sparse.feature_count * self._embedding_size, 1024),
torch.nn.ReLU(),
torch.nn.Linear(1024, 512),
torch.nn.ReLU(),
torch.nn.Linear(512, 1),
)
def forward(self, x):
x = self._sparse(x)
x = self._dense(x)
return torch.sigmoid(x)
创建 DemoModule
类型的对象:
module = DemoModule()
训练模型
训练模式时,需要创建一个 ms.PyTorchEstimator
,传入一些必要的参数,包括上面定义的模型结构 module
对象,以及 server 和 worker 的数量等。
model_out_path
参数设置模型的输出目录。
input_label_column_index
参数设置标签(label)列序号,在这个数据集中是第 0 列。
model_out_path = S3_ROOT_DIR + 'demo/output/dev/model_out/'
estimator = ms.PyTorchEstimator(module=module,
worker_count=1,
server_count=1,
model_out_path=model_out_path,
experiment_name='0.1',
input_label_column_index=0)
接下来, 我们创建一个 Spark session,调用 ms.spark.get_session()
并读取数据: ms.input.read_s3_csv()
.
delimiter
参数设置 CSV 文件的分隔符,在这个数据集是 TAB 分隔,所以我们传入'\t'
。
train_dataset_path = S3_ROOT_DIR + 'demo/data/train/day_0_0.001_train.csv'
spark_session = ms.spark.get_session(local=True,
batch_size=100,
worker_count=estimator.worker_count,
server_count=estimator.server_count)
train_dataset = ms.input.read_s3_csv(spark_session, train_dataset_path, delimiter='\t')
最后,我们调用 ms.PyTorchEstimator
的 fit()
方法,就开始了模型训练。训练过程可能会持续几分钟。训练过程中的 loss、auc 等指标会打印出来。
训练完成后会自动将模型保存到 model_out_path
参数指定的目录,并返回一个 model
对象。
model = estimator.fit(train_dataset)
评估模型
我们可以调用 ms.input.read_s3_csv()
加载一个测试数据集,同样使用 \t
分隔:
test_dataset_path = S3_ROOT_DIR + 'demo/data/test/day_0_0.001_test.csv'
test_dataset = ms.input.read_s3_csv(spark_session, test_dataset_path, delimiter='\t')
然后我们调用 model.transform()
方法,对测试数据集进行预测,将会输出一个 DataFrame
对象,在测试数据集上增加两列:rawPrediction
包含了预测的分数,label
列包含实际的标签值。输出的 DataFrame 会保存在 result
对象中。
result = model.transform(test_dataset)
result.show(5)
最后我们使用 Spark ML 自带的二分类评估器 pyspark.ml.evaluation.BinaryClassificationEvaluator
来计算 AUC 指标:
import pyspark
evaluator = pyspark.ml.evaluation.BinaryClassificationEvaluator()
test_auc = evaluator.evaluate(result)
print('test_auc: %g' % test_auc)
所有计算完成后,我们调用 spark_session
的 stop()
方法释放计算资源:
spark_session.stop()
总结
MetaSpore 通过与 Spark DataFrame 打通,可以很方便地读取各类数据源进行训练和预测,和 Spark ML 也能够很好地集成。